<!--
 Licensed to the Apache Software Foundation (ASF) under one or more
 contributor license agreements.  See the NOTICE file distributed with
 this work for additional information regarding copyright ownership.
 The ASF licenses this file to You under the Apache License, Version 2.0
 (the "License"); you may not use this file except in compliance with
 the License.  You may obtain a copy of the License at

    http://www.apache.org/licenses/LICENSE-2.0

 Unless required by applicable law or agreed to in writing, software
 distributed under the License is distributed on an "AS IS" BASIS,
 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 See the License for the specific language governing permissions and
 limitations under the License.
-->

<script><!--#include virtual="../../js/templateData.js" --></script>

<script id="content-template" type="text/x-handlebars-template">
    <!-- h1>Developer Guide for Kafka Streams</h1 -->
    <div class="sub-nav-sticky">
        <div class="sticky-top">
            <!-- div style="height:35px">
              <a href="/{{version}}/documentation/streams/">Introduction</a>
              <a class="active-menu-item" href="/{{version}}/documentation/streams/developer-guide">Developer Guide</a>
              <a href="/{{version}}/documentation/streams/core-concepts">Concepts</a>
              <a href="/{{version}}/documentation/streams/quickstart">Run Demo App</a>
              <a href="/{{version}}/documentation/streams/tutorial">Tutorial: Write App</a>
            </div -->
        </div>
    </div>

	<div class="section" id="naming">
        <span id="streams-developer-guide-dsl-topology-naming"></span>
        <h1>Naming Operators in a Kafka Streams DSL Application<a class="headerlink" href="#naming" title="Permalink to this headline"></a></h1>

        <p>
		   You now can give names to processors when using the Kafka Streams DSL.
		   In the PAPI there are <code>Processors</code> and <code>State Stores</code> and
		   you are required to explicitly name each one.
	    </p>
		<p>
		   At the DLS layer, there are operators.  A single DSL operator may
		   compile down to multiple <code>Processors</code> and <code>State Stores</code>, and
		   if required <code>repartition topics</code>. But with the Kafka Streams
		   DSL, all these names are generated for you. There is a relationship between
		   the generated processor name state store names (hence changelog topic names) and repartition
		   topic names. Note, that the names of state stores and changelog/repartition topics
		   are "stateful" while processor names are "stateless".
	    </p>
		<p>
			This distinction
		   of stateful vs. stateless names has important implications when updating your topology.
		   While the internal naming makes creating
		   a topology with the DSL much more straightforward,
		   there are a couple of trade-offs.  The first trade-off is what we could
		   consider a readability issue. The other
		   more severe trade-off is the shifting of names due to the relationship between the
		   DSL operator and the generated <code>Processors</code>, <code>State Stores</code> changelog
		   topics and repartition topics.
	   </p>


		<h2>Readability Issues</h2>

		<p>
		 	By saying there is a readability trade-off, we are referring to viewing a description of the topology.
		 	When you render the string description of your topology via the <code>Topology#desribe()</code>
		 	method, you can see what the processor is, but you don't have any context for its business purpose.
		 	For example, consider the following simple topology:

			<br/>
		<pre>
		KStream&lt;String,String&gt; stream = builder.stream("input");
		stream.filter((k,v) -> !v.equals("invalid_txn"))
		      .mapValues((v) -> v.substring(0,5))
		      .to("output")
	    </pre>

		</p>

		<p>
		Running <code>Topology#describe()</code> yields this string:

		<pre>
		Topologies:
		   Sub-topology: 0
		    Source: KSTREAM-SOURCE-0000000000 (topics: [input])
		      --> KSTREAM-FILTER-0000000001
		    Processor: KSTREAM-FILTER-0000000001 (stores: [])
		      --> KSTREAM-MAPVALUES-0000000002
		      <-- KSTREAM-SOURCE-0000000000
		    Processor: KSTREAM-MAPVALUES-0000000002 (stores: [])
		      --> KSTREAM-SINK-0000000003
		      <-- KSTREAM-FILTER-0000000001
		    Sink: KSTREAM-SINK-0000000003 (topic: output)
		      <-- KSTREAM-MAPVALUES-0000000002
		 </pre>

		 From this report, you can see what the different operators are, but what is the broader context here?
		 For example, consider <code>KSTREAM-FILTER-0000000001</code>, we can see that it's a
		 filter operation, which means that records are dropped that don't match the given predicate.  But what is
		 the meaning of the predicate?  Additionally, you can see the topic names of the source and sink nodes,
		 but what if the topics aren't named in a meaningful way?  Then you're left to guess the
		 business purpose behind these topics.
		</p>
		<p>
		 Also notice the numbering here: the source node is suffixed with <code>0000000000</code>
		 indicating it's the first processor in the topology.  
		 The filter is suffixed with <code>0000000001</code>, indicating it's the second processor in
		 the topology.   In Kafka Streams, there are now overloaded methods for
		 both <code>KStream</code> and <code>KTable</code> that accept
		 a new parameter <code>Named</code>. By using the <code>Named</code> class DSL users can
		 provide meaningful names to the processors in their topology.
		</p>
		<p>
		 Now let's take a look at your topology with all the processors named:
		<pre>
		KStream&lt;String,String&gt; stream =
		builder.stream("input", Consumed.as("Customer_transactions_input_topic"));
		stream.filter((k,v) -> !v.equals("invalid_txn"), Named.as("filter_out_invalid_txns"))
		      .mapValues((v) -> v.substring(0,5), Named.as("Map_values_to_first_6_characters"))
		      .to("output", Produced.as("Mapped_transactions_output_topic"));
	     </pre>

		 <pre>
		 Topologies:
		   Sub-topology: 0
		    Source: Customer_transactions_input_topic (topics: [input])
		      --> filter_out_invalid_txns
		    Processor: filter_out_invalid_txns (stores: [])
		      --> Map_values_to_first_6_characters
		      <-- Customer_transactions_input_topic
		    Processor: Map_values_to_first_6_characters (stores: [])
		      --> Mapped_transactions_output_topic
		      <-- filter_out_invalid_txns
		    Sink: Mapped_transactions_output_topic (topic: output)
		      <-- Map_values_to_first_6_characters
		 </pre>

		Now you can look at the topology description and easily understand what role each processor
		plays in the topology. But there's another reason for naming your processor nodes when you
		have stateful operators that remain between restarts of your Kafka Streams applications, 
		state stores, changelog topics, and repartition topics.
		</p>

		<h2>Changing Names</h2>
		<p>
		 Generated names are numbered where they are built in the topology.
		 The name generation strategy is
		 <code>KSTREAM|KTABLE-&gt;operator name&lt;-&gt;number suffix&lt;</code>. The number is a
		 globally incrementing number that represents the operator's order in the topology.
		 The generated number is prefixed with a varying number of "0"s to create a
		 string that is consistently 10 characters long.
		 This means that if you add/remove or shift the order of operations, the position of the
		 processor shifts, which shifts the name of the processor.  Since <strong>most</strong> processors exist
		 in memory only, this name shifting presents no issue for many topologies.  But the name
		 shifting does have implications for topologies with stateful operators or repartition topics. 

		 Here's a different topology with some state:
		 <pre>
		 KStream&lt;String,String&gt; stream = builder.stream("input");
		 stream.groupByKey()
		       .count()
		       .toStream()
		       .to("output");
		 </pre>
		This topology description yields the following:
		 <pre>
			 Topologies:
			   Sub-topology: 0
			    Source: KSTREAM-SOURCE-0000000000 (topics: [input])
			     --> KSTREAM-AGGREGATE-0000000002
			    Processor: KSTREAM-AGGREGATE-0000000002 (stores: [KSTREAM-AGGREGATE-STATE-STORE-0000000001])
			     --> KTABLE-TOSTREAM-0000000003
			     <-- KSTREAM-SOURCE-0000000000
			    Processor: KTABLE-TOSTREAM-0000000003 (stores: [])
			     --> KSTREAM-SINK-0000000004
			     <-- KSTREAM-AGGREGATE-0000000002
			    Sink: KSTREAM-SINK-0000000004 (topic: output)
			     <-- KTABLE-TOSTREAM-0000000003
		 </pre>
		 </p>
		 <p>
		  You can see from the topology description above that the state store is named
		  <code>KSTREAM-AGGREGATE-STATE-STORE-0000000002</code>.  Here's what happens when you
		  add a filter to keep some of the records out of the aggregation:
		  <pre>
		   KStream&lt;String,String&gt; stream = builder.stream("input");
		   stream.filter((k,v)-> v !=null && v.length() >= 6 )
		         .groupByKey()
		         .count()
		         .toStream()
		         .to("output");
		  </pre>

		  And the corresponding topology:
		  <pre>
			  Topologies:
			    Sub-topology: 0
			     Source: KSTREAM-SOURCE-0000000000 (topics: [input])
			      --> KSTREAM-FILTER-0000000001
			     Processor: KSTREAM-FILTER-0000000001 (stores: [])
			       --> KSTREAM-AGGREGATE-0000000003
			       <-- KSTREAM-SOURCE-0000000000
			     Processor: KSTREAM-AGGREGATE-0000000003 (stores: [KSTREAM-AGGREGATE-STATE-STORE-0000000002])
			       --> KTABLE-TOSTREAM-0000000004
			       <-- KSTREAM-FILTER-0000000001
			     Processor: KTABLE-TOSTREAM-0000000004 (stores: [])
			       --> KSTREAM-SINK-0000000005
			       <-- KSTREAM-AGGREGATE-0000000003
			      Sink: KSTREAM-SINK-0000000005 (topic: output)
			       <-- KTABLE-TOSTREAM-0000000004
		  </pre>
		 </p>
		<p>
		Notice that since you've added an operation <em>before</em> the <code>count</code> operation, the state
		  store (and the changelog topic) names have changed.  This name change means you can't
		  do a rolling re-deployment of your updated topology.  Also, you must use the
		  <a href="/{{version}}/documentation/streams/developer-guide/app-reset-tool">Streams Reset Tool</a> 
		  to re-calculate the aggregations, because the changelog topic has changed on start-up and the
		  new changelog topic contains no data.

		  Fortunately, there's an easy solution to remedy this situation.  Give the
		  state store a user-defined name instead of relying on the generated one,
		  so you don't have to worry about topology changes shifting the name of the state store.

		  You've had the ability to name repartition topics with the <code>Joined</code>,
		  <code>StreamJoined</code>, and<code>Grouped</code> classes, and
		  name state store and changelog topics with <code>Materialized</code>.
		  But it's worth reiterating the importance of naming these DSL topology operations again.

		  Here's how your DSL code looks now giving a specific name to your state store:
		  <pre>
		  KStream&lt;String,String&gt; stream = builder.stream("input");
		  stream.filter((k, v) -> v != null && v.length() >= 6)
		        .groupByKey()
		        .count(Materialized.as("Purchase_count_store"))
		        .toStream()
		        .to("output");
		  </pre>

		  And here's the topology 

		  <pre>
		  Topologies:
		   Sub-topology: 0
		    Source: KSTREAM-SOURCE-0000000000 (topics: [input])
		      --> KSTREAM-FILTER-0000000001
		    Processor: KSTREAM-FILTER-0000000001 (stores: [])
		      --> KSTREAM-AGGREGATE-0000000002
		      <-- KSTREAM-SOURCE-0000000000
		    Processor: KSTREAM-AGGREGATE-0000000002 (stores: [Purchase_count_store])
		      --> KTABLE-TOSTREAM-0000000003
		      <-- KSTREAM-FILTER-0000000001
		    Processor: KTABLE-TOSTREAM-0000000003 (stores: [])
		      --> KSTREAM-SINK-0000000004
		      <-- KSTREAM-AGGREGATE-0000000002
		    Sink: KSTREAM-SINK-0000000004 (topic: output)
		      <-- KTABLE-TOSTREAM-0000000003
		  </pre>
		</p>
		<p>
		  Now, even though you've added processors before your state store, the store name and its changelog
		  topic names don't change.  This makes your topology more robust and resilient to changes made by
		  adding or removing processors.
		</p>

		<h2>Conclusion</h2>

		 It's a good practice to name your processing nodes when using the DSL, and it's even
		 more important to do this when you have "stateful" processors
		your application such as repartition
		 topics and state stores (and the accompanying changelog topics).
        <p>
		 Here are a couple of points to remember when naming your DSL topology:
		<ol>
			<li>
				If you have an <em>existing topology</em> and you <em>haven't</em> named your
				state stores (and changelog topics) and repartition topics, we recommended that you
				do so.  But this will be a topology breaking change, so you'll need to shut down all
				application instances, make the changes, and run the
				<a href="/{{version}}/documentation/streams/developer-guide/app-reset-tool">Streams Reset Tool</a>.
				Although this may be inconvenient at first, it's worth the effort to protect your application from
				unexpected errors due to topology changes.
			</li>
			<li>
				If you have a <em>new topology</em>, make sure you name the persistent parts of your topology:
				state stores (changelog topics) and repartition topics. This way, when you deploy your
				application, you're protected from topology changes that otherwise would break your Kafka Streams application.
				If you don't want to add names to stateless processors at first, that's fine as you can
				always go back and add the names later.
			</li>
		</ol>

		  Here's a quick reference on naming the critical parts of
		  your Kafka Streams application to prevent topology name changes from breaking your application:

		  <table>
		      <tr>
		          <th>Operation</th><th>Naming Class</th>
		      </tr>
		      <tr>
		          <td>Aggregation repartition topics</td><td>Grouped</td>
		      </tr>
		      <tr>
		          <td>KStream-KStream Join repartition topics</td><td>StreamJoined</td>
		      </tr>
			  <tr>
				  <td>KStream-KTable Join repartition topic</td><td>Joined</td>
			  </tr>
		      <tr>
		          <td>KStream-KStream Join state stores</td><td>StreamJoined</td>
		      </tr>
		      <tr>
		          <td>State Stores (for aggregations and KTable-KTable joins)</td><td>Materialized</td>
		      </tr>
		      <tr>
		          <td>Stream/Table non-stateful operations</td><td>Named</td>
		      </tr>
		  </table>
	</p>
</div>

</script>

<!--#include virtual="../../../includes/_header.htm" -->
<!--#include virtual="../../../includes/_top.htm" -->
<div class="content documentation documentation--current">
	<!--#include virtual="../../../includes/_nav.htm" -->
	<div class="right">
		<!--#include virtual="../../../includes/_docs_banner.htm" -->
		<ul class="breadcrumbs">
			<li><a href="/documentation">Documentation</a></li>
			<li><a href="/documentation/streams">Kafka Streams</a></li>
			<li><a href="/documentation/streams/developer-guide/">Developer Guide</a></li>
		</ul>
		<div class="p-content"></div>
	</div>
</div>
<!--#include virtual="../../../includes/_footer.htm" -->
<script>
	$(function () {
		// Show selected style on nav item
		$('.b-nav__streams').addClass('selected');

		//sticky secondary nav
		var $navbar = $(".sub-nav-sticky"),
				y_pos = $navbar.offset().top,
				height = $navbar.height();

		$(window).scroll(function () {
			var scrollTop = $(window).scrollTop();

			if (scrollTop > y_pos - height) {
				$navbar.addClass("navbar-fixed")
			} else if (scrollTop <= y_pos) {
				$navbar.removeClass("navbar-fixed")
			}
		});

		// Display docs subnav items
		$('.b-nav__docs').parent().toggleClass('nav__item__with__subs--expanded');
	});
</script>




